package com.example.demo.stream;

import com.example.demo.entity.UserBean;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;
import java.util.Arrays;

/**
 * flink各个算子功能demo
 */
public class DataStreamOperator {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String type = parameterTool.get("type", "0");
        switch (type) {
            case "1":
                aggregate(env);
                break;
            case "2":
                flatMap(env);
                break;
            case "3":
                fliter(env);
                break;
            case "4":
                join(env);
                break;
            case "5":
                keyBy(env);
                break;
            case "6":
                map(env);
                break;
            case "7":
                reduce(env);
                break;
            case "8":
                sideOutput(env);
                break;
            case "9":
                union(env);
                break;
            default:
                throw new RuntimeException("算子类型错误！");
        }
        env.execute("flink job operator");
    }

    /**
     * aggregate filter flatmap fold keyby map reduce
     * Summary:
     * Aggregate: min()、minBy()、max()、maxBy() 滚动聚合并输出每次滚动聚合后的结果
     */
    private static void aggregate(StreamExecutionEnvironment env) {
        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品，以及商品的价格。
        ArrayList<UserBean> userBeans = new ArrayList<>();
        UserBean userbeanlog1 = new UserBean();
        userbeanlog1.setUserID("userID1");
        userbeanlog1.setProductID("productID3");
        userbeanlog1.setProductPrice(10);
        userBeans.add(userbeanlog1);

        UserBean userbeanlog2 = new UserBean();
        userbeanlog2.setUserID("userID2");
        userbeanlog2.setProductPrice(10);
        userBeans.add(userbeanlog2);

        UserBean userbeanlog3 = new UserBean();
        userbeanlog3.setUserID("userID1");
        userbeanlog3.setProductID("productID5");
        userbeanlog3.setProductPrice(30);
        userBeans.add(userbeanlog3);

        DataStreamSource<UserBean> source = env.fromCollection(userBeans);

        // 转换: KeyBy对数据重分区
        // 这里, UserActionLog是POJO类型,也可通过keyBy("userID")进行分区
        KeyedStream<UserBean, String> keyedStream = source.keyBy((KeySelector<UserBean, String>) UserBean::getUserID);
        //userid1 10 30 userid2 10
        // 转换: Aggregate并输出
        // 滚动求和并输出
        keyedStream.sum("productPrice").print();
        // 滚动求最大值并输出
        keyedStream.max("productPrice").print();
        // 滚动求最大值并输出
        keyedStream.maxBy("productPrice").print();
        // 滚动求最小值并输出
        keyedStream.min("productPrice").print();
        // 滚动求最小值并输出
        keyedStream.minBy("productPrice").print();
    }


    /**
     * Summary:
     * union: 将多个流合并到一个流中，以便对合并的流进行统一处理，
     * 有点类似于Storm中的将上一级的两个Bolt数据汇聚到这一级同一个Bolt中。注意，合并的流类型需要一致
     */
    private static void union(StreamExecutionEnvironment env) {
        DataStream<Tuple1<String>> dataStream1 = env.fromElements(
                Tuple1.of("flink"),
                Tuple1.of("spark"),
                Tuple1.of("hadoop")
        );
        DataStream<Tuple1<String>> dataStream2 = env.fromElements(
                Tuple1.of("oracle"),
                Tuple1.of("mysql"),
                Tuple1.of("sqlserver")
        );

        dataStream1.union(dataStream2).print();
    }
//    private static void split(StreamExecutionEnvironment env) {
//        DataStream<Integer> dataStream=env.fromElements(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16);
//        //定义拆分逻辑
//        SplitStream<Integer> split=dataStream.split(new OutputSelector<Integer>() {
//            @Override
//            public Iterable<String> select(Integer integer) {
//                List<String> output=new ArrayList<>();
//                if(integer%2==0){
//                    //偶数
//                    output.add("even");
//                }else {
//                    //奇数
//                    output.add("odd");
//                }
//                return output;
//            }
//        });
//
//        //选择一个或多个切分后的流
//        DataStream<Integer> evenStream=split.select("even");
//
////        DataStream<Integer> oddStream=split.select("odd");
////        DataStream<Integer> moreStream =split.select("odd","even");
//        evenStream.print().setParallelism(1);
//    }

    /**
     * Summary:
     * Side-Output: Split...Select...已经过时，推荐使用更灵活的侧路输出(Side-Output)，如下。
     * Side-Output是从Flink 1.3.0开始提供的功能，支持了更灵活的多路输出，包括使用RichProcessFunction。
     * Side-Output可以以侧流的形式，以不同于主流的数据类型，向下游输出指定条件的数据、异常数据、迟到数据等等
     */
    private static void sideOutput(StreamExecutionEnvironment env) {
        DataStream<UserBean> dataStream = env.fromElements(
                new UserBean("userID1", 1293984000, "click", "productID1", 10),
                new UserBean("userID1", 1293984000, "browse", "productID1", 10),
                new UserBean("userID2", 1292952000, "browse", "productID2", 20),
                new UserBean("userID2", 1243983000, "click", "productID2", 15),
                new UserBean("userID3", 1293184000, "click", "productID1", 30)
        );
        // 定义OutputTag
        final OutputTag<UserBean> clickTag = new OutputTag<>("click-Tag", TypeInformation.of(UserBean.class));
//        final OutputTag<UserBean> browseTag= new OutputTag<>("browse-Tag", TypeInformation.of(UserBean.class));

        // 在ProcessFunction中处理主流和分流
        SingleOutputStreamOperator<UserBean> processedStream =
                dataStream.process(new ProcessFunction<UserBean, UserBean>() {
                    @Override
                    public void processElement(UserBean userAction, Context context, Collector<UserBean> collector) {
                        // 测流-只输出特定的数据
                        if (userAction.getEventType().equals("click")) {
                            context.output(clickTag, userAction);
                        } else {
                            // 主流
                            collector.collect(userAction);
                        }
                    }
                });
        // 获取主流
        processedStream.print("主流输出B：");
        // 获取分流
        DataStream<UserBean> sideOutput = processedStream.getSideOutput(clickTag);
        sideOutput.print("分流输出A ");
    }

    /**
     * Summary:
     * Reduce: 基于ReduceFunction进行滚动聚合，并向下游算子输出每次滚动聚合后的结果。
     */
    private static void reduce(StreamExecutionEnvironment env) {
        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品，以及商品的价格。
        DataStreamSource<UserBean> source = env.fromCollection(Arrays.asList(
                new UserBean("userID1", 1293984000, "click", "productID1", 10),
                new UserBean("userID2", 1293984001, "browse", "productID2", 8),
                new UserBean("userID2", 1293984002, "browse", "productID2", 8),
                new UserBean("userID2", 1293984003, "browse", "productID2", 8),
                new UserBean("userID1", 1293984002, "click", "productID1", 10),
                new UserBean("userID1", 1293984003, "click", "productID3", 10),
                new UserBean("userID1", 1293984004, "click", "productID1", 10)
        ));

        // 转换: KeyBy对数据重分区
        KeyedStream<UserBean, String> keyedStream = source.keyBy((KeySelector<UserBean, String>) UserBean::getUserID);

        // 转换: Reduce滚动聚合。这里,滚动聚合每个用户对应的商品总价格。
        SingleOutputStreamOperator<UserBean> result = keyedStream.reduce((ReduceFunction<UserBean>) (value1, value2) -> {
            int newProductPrice = value1.getProductPrice() + value2.getProductPrice();
            return new UserBean(value1.getUserID(), -1, "", "", newProductPrice);
        });
        result.print();
    }

    /**
     * Summary:
     * Map: 一对一转换
     */
    private static void map(StreamExecutionEnvironment env) {
        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品，以及商品的价格。
        DataStreamSource<UserBean> source = env.fromCollection(Arrays.asList(
                new UserBean("userID1", 1293984000, "click", "productID1", 10),
                new UserBean("userID2", 1293984001, "browse", "productID2", 8),
                new UserBean("userID1", 1293984002, "click", "productID1", 10)
        ));

        // 转换: 商品的价格乘以8
        SingleOutputStreamOperator<UserBean> result = source.map((MapFunction<UserBean, UserBean>) value -> {
            int newPrice = value.getProductPrice() * 8;
            return new UserBean(value.getUserID(), value.getEventTime(), value.getEventType(), value.getProductID(), newPrice);
        });
        result.print();
    }

    /**
     * Summary:
     * KeyBy: 按指定的Key对数据重分区。将同一Key的数据放到同一个分区。
     */
    private static void keyBy(StreamExecutionEnvironment env) {
        env.setParallelism(1);

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品，以及商品的价格。
        DataStreamSource<UserBean> source = env.fromCollection(Arrays.asList(
                new UserBean("userID1", 1293984000, "click", "productID1", 10),
                new UserBean("userID2", 1293984001, "browse", "productID2", 8),
                new UserBean("userID1", 1293984002, "click", "productID1", 10),
                new UserBean("userID3", 1293984002, "click", "productID1", 10),
                new UserBean("userID1", 1293984002, "click", "productID1", 10),
                new UserBean("userID2", 1293984002, "click", "productID1", 10)
        ));

        // 转换: 按指定的Key(这里,用户ID)对数据重分区，将相同Key(用户ID)的数据分到同一个分区
        KeyedStream<UserBean, String> result = source.keyBy((KeySelector<UserBean, String>) UserBean::getUserID);
        result.print().setParallelism(4);
    }

    /**
     * Summary:
     * join: 根据指定的Key将两个流进行关联。
     */
    private static void join(StreamExecutionEnvironment env) {
        //2.定义加载或创建数据源（source）,监听9000端口的socket消息
        DataStream<UserBean> textStream1 = env.fromElements(
                new UserBean("userID1", 1293914003, "browse", "productID2", 8),
                new UserBean("userID1", 1292984002, "click", "productID1", 10),
                new UserBean("userID3", 1293384003, "click", "productID3", 10)
        );
        DataStream<UserBean> textStream2 = env.fromElements(
                new UserBean("userID1", 1293984003, "browse", "productID2", 8),
                new UserBean("userID1", 1293924002, "click", "productID1", 10),
                new UserBean("userID3", 1293914103, "click", "productID1", 10)
        );
        //将输入处理一下，变为tuple2
        DataStream<UserBean> mapStream1 = textStream1
                .map((MapFunction<UserBean, UserBean>) userAction -> {
                    //userAction.setProductID("mapStream1");
                    return userAction;
                });

        DataStream<UserBean> mapStream2 = textStream2
                .map((MapFunction<UserBean, UserBean>) userAction -> {
                    //userAction.setProductID("mapStream2");
                    return userAction;
                });

        //3.两个流进行join操作，是inner join，关联上的才能保留下来
        DataStream<String> result = mapStream1.join(mapStream2)
                //关联条件，以第0列关联（两个source输入的字符串）
                .where(UserBean::getUserID).equalTo(UserBean::getUserID)
                //以处理时间，每10秒一个滚动窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                //关联后输出
                .apply((t1, t2) -> t1 + "|" + t2);
        //4.打印输出sink
        result.print();
    }

//    /**
//     * Summary:
//     *      Fold: 基于初始值和自定义的FoldFunction滚动折叠后发出新值
//     */
//    private static void fold(StreamExecutionEnvironment env) {
//        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品，以及商品的价格。
//        DataStreamSource<UserBean> source = env.fromCollection(Arrays.asList(
//                new UserBean("userID1", 1293984000, "click", "productID1", 10),
//                new UserBean("userID2", 1293984001, "browse", "productID2", 8),
//                new UserBean("userID2", 1293984002, "browse", "productID2", 8),
//                new UserBean("userID2", 1293984003, "browse", "productID2", 8),
//                new UserBean("userID1", 1293984002, "click", "productID1", 10),
//                new UserBean("userID1", 1293984003, "click", "productID3", 10),
//                new UserBean("userID1", 1293984004, "click", "productID1", 10)
//        ));
//
//        // 转换: KeyBy对数据重分区
//        KeyedStream<UserBean, String> keyedStream = source.keyBy(new
//        elector<UserBean, String>() {
//            @Override
//            public String getKey(UserBean value) throws Exception {
//                return value.getUserID();
//            }
//        });
//
//        // 转换: Fold 基于初始值和FoldFunction滚动折叠
//        SingleOutputStreamOperator<String> result = keyedStream.fold("浏览的商品及价格:", new FoldFunction<UserBean, String>() {
//            @Override
//            public String fold(String accumulator, UserBean value) throws Exception {
//                if(accumulator.startsWith("userID")){
//                    return accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
//                }else {
//                    return value.getUserID()+" " +accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
//                }
//            }
//        });
//
//        result.print();
//    }

    /**
     * Summary:
     * FlatMap: 一行变任意行(0~多行)
     */
    private static void flatMap(StreamExecutionEnvironment env) {
        // 输入: 英文电影台词
        DataStreamSource<String> source = env
                .fromElements(
                        "You jump I jump",
                        "Life was like a box of chocolates");
        // 转换: 将包含chocolates的句子转换为每行一个单词
        SingleOutputStreamOperator<String> result = source.flatMap((FlatMapFunction<String, String>) (value, out) -> {
            if (value.contains("chocolates")) {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(word);
                }
            }
        });
        result.print();
    }

    /**
     * Summary:
     * Fliter: 过滤出需要的数据
     */
    private static void fliter(StreamExecutionEnvironment env) {
        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品，以及商品的价格。
        DataStreamSource<UserBean> source = env.fromCollection(Arrays.asList(
                new UserBean("userID1", 1293984000, "click", "productID1", 10),
                new UserBean("userID2", 1293984001, "browse", "productID2", 8),
                new UserBean("userID1", 1293984002, "click", "productID1", 10)
        ));
        // 过滤: 过滤出用户ID为userID1的用户行为
        SingleOutputStreamOperator<UserBean> result = source.filter((FilterFunction<UserBean>) value -> value.getUserID().equals("userID1"));
        result.print();
    }

}

